/*
 * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
 * or more contributor license agreements. Licensed under the Elastic License;
 * you may not use this file except in compliance with the Elastic License.
 */
package org.elasticsearch.xpack.upgrade;

import com.carrotsearch.hppc.cursors.ObjectCursor;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.protocol.xpack.migration.UpgradeActionRequired;
import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.upgrade.IndexUpgradeCheckVersion;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static org.elasticsearch.test.VersionUtils.randomVersionBetween;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.hamcrest.core.IsEqual.equalTo;

@ClusterScope(scope=Scope.TEST)
public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase {

    @Override
    protected Collection<Class<? extends Plugin>> nodePlugins() {
        return Arrays.asList(LocalStateCompositeXPackPlugin.class,
                             ReindexPlugin.class, CustomScriptPlugin.class, CommonAnalysisPlugin.class);
    }

    public static class CustomScriptPlugin extends MockScriptPlugin {
        @Override
        protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
            Map<String, Function<Map<String, Object>, Object>> scripts = new HashMap<>();
            scripts.put("add_bar", map -> {
                @SuppressWarnings("unchecked") Map<String, Object> ctx = (Map<String, Object>) map.get("ctx");
                ctx.put("_id", "bar" + "-" + ctx.get("_id"));
                @SuppressWarnings("unchecked") Map<String, Object> source = (Map<String, Object>) ctx.get("_source");
                source.put("bar", true);
                return null;
            });
            scripts.put("fail", map -> {
                throw new RuntimeException("Stop reindexing");
            });
            return scripts;
        }
    }

    public void testUpgradeIndex() throws Exception {
        createTestIndex("test");
        InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY);
        PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture();
        reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future);
        BulkByScrollResponse response = future.actionGet();
        assertThat(response.getCreated(), equalTo(2L));

        SearchResponse searchResponse = client().prepareSearch("test-" + IndexUpgradeCheckVersion.UPGRADE_VERSION).get();
        assertThat(searchResponse.getHits().getTotalHits().value, equalTo(2L));
        assertThat(searchResponse.getHits().getHits().length, equalTo(2));
        for (SearchHit hit : searchResponse.getHits().getHits()) {
            assertThat(hit.getId(), startsWith("bar-"));
            assertThat(hit.getSourceAsMap(), notNullValue());
            assertThat(hit.getSourceAsMap().get("bar"), equalTo(true));
        }

        GetAliasesResponse aliasesResponse = client().admin().indices().prepareGetAliases("test").get();
        assertThat(aliasesResponse.getAliases().size(), equalTo(1));
        List<AliasMetaData> testAlias = aliasesResponse.getAliases().get("test-" + IndexUpgradeCheckVersion.UPGRADE_VERSION);
        assertNotNull(testAlias);
        assertThat(testAlias.size(), equalTo(1));
        assertThat(testAlias.get(0).alias(), equalTo("test"));
    }

    public void testTargetIndexExists() throws Exception {
        createTestIndex("test");
        createTestIndex("test-" + IndexUpgradeCheckVersion.UPGRADE_VERSION);
        InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY);
        PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture();
        reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future);
        assertThrows(future, ResourceAlreadyExistsException.class);

        // Make sure that the index is not marked as read-only
        client().prepareIndex("test", "doc").setSource("foo", "bar").get();
    }

    public void testTargetIndexExistsAsAlias() throws Exception {
        createTestIndex("test");
        createTestIndex("test-foo");
        client().admin().indices().prepareAliases().addAlias("test-foo", "test-" + IndexUpgradeCheckVersion.UPGRADE_VERSION).get();
        InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY);
        PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture();
        reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future);
        assertThrows(future, InvalidIndexNameException.class);

        // Make sure that the index is not marked as read-only
        client().prepareIndex("test-" + IndexUpgradeCheckVersion.UPGRADE_VERSION, "doc").setSource("foo", "bar").get();
    }

    public void testSourceIndexIsReadonly() throws Exception {
        createTestIndex("test");
        try {
            Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), true).build();
            assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get());
            InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY);
            PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture();
            reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future);
            assertThrows(future, IllegalStateException.class);

            // Make sure that the index is still marked as read-only
            assertThrows(client().prepareIndex("test", "doc").setSource("foo", "bar"), ClusterBlockException.class);
        } finally {
            // Clean up the readonly index
            Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), false).build();
            assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get());
        }
    }

    public void testReindexingFailureWithClusterRoutingAllocationDisabled() throws Exception {
        createTestIndex("test");

        Settings settings = Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none")
                .build();
        ClusterUpdateSettingsResponse clusterUpdateResponse = client().admin().cluster().prepareUpdateSettings()
                .setTransientSettings(settings).get();
        assertThat(clusterUpdateResponse.isAcknowledged(), is(true));
        assertThat(clusterUpdateResponse.getTransientSettings()
                .get(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey()), is("none"));

        InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY);
        PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture();
        reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future);
        ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> future.actionGet());
        assertThat(e.getMessage(), containsString(
                "pre-upgrade check failed, please enable cluster routing allocation using setting [cluster.routing.allocation.enable]"));
    }

    public void testReindexingFailure() throws Exception {
        createTestIndex("test");
        // Make sure that the index is not marked as read-only
        client().prepareIndex("test", "doc").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
        InternalIndexReindexer reindexer = createIndexReindexer(script("fail"), Strings.EMPTY_ARRAY);
        PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture();
        reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future);
        assertThrows(future, RuntimeException.class);

        // Make sure that the index is not marked as read-only
        client().prepareIndex("test", "doc").setSource("foo", "bar").get();
    }

    public void testMixedNodeVersion() throws Exception {
        createTestIndex("test");

        InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY);
        PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture();
        reindexer.upgrade(new TaskId("abc", 123), "test", withRandomOldNode(), future);
        assertThrows(future, IllegalStateException.class);

        // Make sure that the index is not marked as read-only
        client().prepareIndex("test_v123", "doc").setSource("foo", "bar").get();
    }

    private void createTestIndex(String indexName) throws Exception {
        assertAcked(client().admin().indices().prepareCreate(indexName).get());
        indexRandom(true,
                client().prepareIndex(indexName, "doc", "1").setSource("{\"foo\":\"bar1-1\"}", XContentType.JSON),
                client().prepareIndex(indexName, "doc", "2").setSource("{\"foo\":\"baz1-1\"}", XContentType.JSON)
        );
        ensureYellow(indexName);
    }

    private Script script(String name) {
        return new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, name, new HashMap<>());
    }

    private InternalIndexReindexer createIndexReindexer(Script transformScript, String[] types) {
        return new IndexUpgradeCheck("test", imd -> UpgradeActionRequired.UPGRADE, client(),
                internalCluster().clusterService(internalCluster().getMasterName()), types, transformScript).getInternalIndexReindexer();
    }

    private ClusterState clusterState() {
        return clusterService().state();
    }

    private ClusterState withRandomOldNode() {
        ClusterState clusterState = clusterState();
        DiscoveryNodes discoveryNodes = clusterState.nodes();
        List<String> nodes = new ArrayList<>();
        for (ObjectCursor<String> key : discoveryNodes.getMasterAndDataNodes().keys()) {
            nodes.add(key.value);
        }
        // Fake one of the node versions
        String nodeId = randomFrom(nodes);
        DiscoveryNode node = discoveryNodes.get(nodeId);
        DiscoveryNode newNode = new DiscoveryNode(node.getName(), node.getId(), node.getEphemeralId(), node.getHostName(),
                node.getHostAddress(), node.getAddress(), node.getAttributes(), node.getRoles(),
                randomVersionBetween(random(), Version.V_6_0_0, Version.V_6_4_0));

        return ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(discoveryNodes).remove(node).add(newNode)).build();

    }
}
